package com.k2data.k2de.client;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

/**
 * Created by luoqifei on 2016/12/28.
 */
public class KafkaClient {

    public static final Logger LOGGER = LoggerFactory.getLogger(KafkaClient.class);
    private Producer<String, String> producer;
    private String topicName;
    private Properties props;

    public KafkaClient(String topicName, String serverAddress) throws ExecutionException, InterruptedException {
        this.topicName = topicName;
        props = new Properties();
        //props.put("bootstrap.servers", serverAddress);
        initDefaultProps(serverAddress);
        this.producer = new KafkaProducer<>(props);
    }

    /*private void testKafka() throws ExecutionException, InterruptedException {
        producer.send(new ProducerRecord<String, String>("", "health test", "health test")).get();
    }*/
    private void initDefaultProps(String serverAddress) {
        props.put("bootstrap.servers", serverAddress);
        //Set acknowledgements for producer requests.
        props.put("acks", "1");
        //If the request fails, the producer can automatically retry,
        props.put("retries", 3);
        //props.put("producer.type", "async");
        //wait for all server broker ack in 20s,if not,return timeout,but not contain the net refuse
        props.put("timeout.ms", 20000);
        //Reduce the no of requests less than 0
        //props.put("linger.ms", 1);

        //The buffer.memory controls the total amount of memory available to the producer for buffering.
        //props.put("buffer.memory", 33554432);
        props.put("key.serializer", StringSerializer.class);
        props.put("value.serializer", StringSerializer.class);
    }

    /**
     * @deprecated use sendMessageSync or sendMessageAsync
     */
    public void sendMessage(String key, String values) throws ExecutionException, InterruptedException {
        producer.send(new ProducerRecord<>(topicName, key, values)).get();
    }

    public void sendMessageSync(String key, String values) throws ExecutionException, InterruptedException {
        producer.send(new ProducerRecord<>(topicName, key, values)).get();

    }

    public void sendMessageAsync(String key, String values) throws ExecutionException, InterruptedException {
        producer.send(new ProducerRecord<>(topicName, key, values));
    }

    public void close() {
        if (producer != null) {
            LOGGER.info("close producer...");
            producer.close();
        }
    }
}
